package nettyRPC.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import nettyRPC.protocol.RpcDecoder;
import nettyRPC.protocol.RpcEncoder;
import nettyRPC.protocol.RpcRequest;
import nettyRPC.protocol.RpcResponse;
import nettyRPC.registry.ServiceRegistry;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @program: NettyRpc-master
 * @description
 * @author: tkk fendoukaoziji
 * @create: 2019-08-07 17:35
 **/
public class RpcServer implements ApplicationContextAware, InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    private String serverAddress;
    private ServiceRegistry serviceRegistry;
    private Map<String,Object> handlerMap=new HashMap<>();//接口名与服务对象之间的映射关系
    private static ThreadPoolExecutor threadPoolExecutor;
    private EventLoopGroup bossGroup=null;
    private EventLoopGroup workerGroup=null;

    public RpcServer(String serverAddress) {
        this.serverAddress = serverAddress;
    }

    public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
        this.serverAddress = serverAddress;
        this.serviceRegistry = serviceRegistry;
    }



    @Override
    public void afterPropertiesSet() throws Exception {
        start();
    }

    public void stop(){
        if(bossGroup!=null){
            bossGroup.shutdownGracefully();
        }
        if(workerGroup!=null){
            workerGroup.shutdownGracefully();
        }
    }

    public static void submit(Runnable task){
        //单例模式 懒加载
        if(threadPoolExecutor==null){
                synchronized (RpcServer.class){
                    if(threadPoolExecutor==null){
                        threadPoolExecutor = new ThreadPoolExecutor(
                                16, 16, 600L, TimeUnit.SECONDS,
                                new ArrayBlockingQueue<Runnable>(65536));
                    }
                }
        }
        threadPoolExecutor.submit(task);
    }
    private void start() throws InterruptedException {
        if(bossGroup==null&& workerGroup==null){
            bossGroup=new NioEventLoopGroup();
            workerGroup=new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))
                                    .addLast(new RpcDecoder(RpcRequest.class))
                                    .addLast(new RpcEncoder(RpcResponse.class))
                                    .addLast(new RpcHandler(handlerMap));
                        }
                    }).option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE,true);
            String[] array = serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);
            ChannelFuture future = bootstrap.bind(host, port).sync();
            logger.info("server started on port{},",port);
            if(serviceRegistry!=null){
                serviceRegistry.register(serverAddress);
            }
            future.channel().closeFuture().sync();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if(MapUtils.isNotEmpty(serviceBeanMap)){
            for (Object serviceBean :serviceBeanMap.values() ) {
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                logger.info("Loading service:{}",interfaceName);
                handlerMap.put(interfaceName,serviceBean);
            }
        }
    }
}






















